package com.xiaofan.ct.consumer.bean;

import com.xiaofan.ct.common.bean.Consumer;
import com.xiaofan.ct.common.constant.Names;
import com.xiaofan.ct.consumer.dao.HbaseDao;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 通话记录消费者
 */
public class CallLogConsumer implements Consumer {

    @Override
    public void consume() {

        try {
            // 创建配置对象
            Properties prop = new Properties();
            prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));

            // 获取flume采集的数据
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
            // 关注主题
            consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));

            // Hbase数据访问对象
            HbaseDao hbaseDao = new HbaseDao();
            hbaseDao.init();

            // 消费数据
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    // hbaseDao.insertData(consumerRecord.value());
                    CallLog log = new CallLog(consumerRecord.value());
                    hbaseDao.insertData(log);
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws IOException {

    }
}
